/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.regionserver;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. It supports
 * pushing a segment at the head of the pipeline and removing a segment from the tail when it is
 * flushed to disk. It also supports swap method to allow the in-memory compaction swap a subset of
 * the segments at the tail of the pipeline with a new (compacted) one. This swap succeeds only if
 * the version number passed with the list of segments to swap is the same as the current version of
 * the pipeline. Essentially, there are two methods which can change the structure of the pipeline:
 * pushHead() and swap(), the later is used both by a flush to disk and by an in-memory compaction.
 * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
 * suffix of the pipeline. The synchronization model is copy-on-write. Methods which change the
 * structure of the pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes in the
 * context of a lock. They also make a read-only copy of the pipeline's list. Read methods read from
 * a read-only copy. If a read method accesses the read-only copy more than once it makes a local
 * copy of it to ensure it accesses the same copy. The methods getVersionedList(),
 * getVersionedTail(), and flattenOneSegment() are also protected by a lock since they need to have
 * a consistent (atomic) view of the pipeline list and version number.
 */
@InterfaceAudience.Private
public class CompactionPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);

  public final static long FIXED_OVERHEAD =
    ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);

  private final RegionServicesForStores region;
  private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
  // The list is volatile to avoid reading a new allocated reference before the c'tor is executed
  private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
  /**
   * <pre>
   * Version is volatile to ensure it is atomically read when not using a lock.
   * To indicate whether the suffix of pipeline changes:
   * 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
   *   added at Head, {@link #version} not change.
   * 2.for {@link CompactionPipeline#swap},{@link #version} increase.
   * 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
   * </pre>
   */
  private volatile long version = 0;

  public CompactionPipeline(RegionServicesForStores region) {
    this.region = region;
  }

  public boolean pushHead(MutableSegment segment) {
    // Record the ImmutableSegment' heap overhead when initialing
    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
    ImmutableSegment immutableSegment =
      SegmentFactory.instance().createImmutableSegment(segment, memstoreAccounting);
    if (region != null) {
      region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
        memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
    }
    synchronized (pipeline) {
      boolean res = addFirst(immutableSegment);
      readOnlyCopy = new LinkedList<>(pipeline);
      return res;
    }
  }

  public VersionedSegmentsList getVersionedList() {
    synchronized (pipeline) {
      return new VersionedSegmentsList(readOnlyCopy, version);
    }
  }

  public VersionedSegmentsList getVersionedTail() {
    synchronized (pipeline) {
      List<ImmutableSegment> segmentList = new ArrayList<>();
      if (!pipeline.isEmpty()) {
        segmentList.add(0, pipeline.getLast());
      }
      return new VersionedSegmentsList(segmentList, version);
    }
  }

  /**
   * Swaps the versioned list at the tail of the pipeline with a new segment. Swapping only if there
   * were no changes to the suffix of the list since the version list was created.
   * @param versionedList    suffix of the pipeline to be replaced can be tail or all the pipeline
   * @param segment          new segment to replace the suffix. Can be null if the suffix just needs
   *                         to be removed.
   * @param closeSuffix      whether to close the suffix (to release memory), as part of swapping it
   *                         out During index merge op this will be false and for compaction it will
   *                         be true.
   * @param updateRegionSize whether to update the region size. Update the region size, when the
   *                         pipeline is swapped as part of in-memory-flush and further
   *                         merge/compaction. Don't update the region size when the swap is result
   *                         of the snapshot (flush-to-disk).
   * @return true iff swapped tail with new segment
   */
  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
      justification = "Increment is done under a synchronize block so safe")
  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
    boolean closeSuffix, boolean updateRegionSize) {
    if (versionedList.getVersion() != version) {
      return false;
    }
    List<ImmutableSegment> suffix;
    synchronized (pipeline) {
      if (versionedList.getVersion() != version) {
        return false;
      }
      suffix = versionedList.getStoreSegments();
      LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
        versionedList.getStoreSegments().size(), segment);
      swapSuffix(suffix, segment, closeSuffix);
      readOnlyCopy = new LinkedList<>(pipeline);
      version++;
    }
    if (updateRegionSize && region != null) {
      // update the global memstore size counter
      long suffixDataSize = getSegmentsKeySize(suffix);
      long suffixHeapSize = getSegmentsHeapSize(suffix);
      long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
      int suffixCellsCount = getSegmentsCellsCount(suffix);
      long newDataSize = 0;
      long newHeapSize = 0;
      long newOffHeapSize = 0;
      int newCellsCount = 0;
      if (segment != null) {
        newDataSize = segment.getDataSize();
        newHeapSize = segment.getHeapSize();
        newOffHeapSize = segment.getOffHeapSize();
        newCellsCount = segment.getCellsCount();
      }
      long dataSizeDelta = suffixDataSize - newDataSize;
      long heapSizeDelta = suffixHeapSize - newHeapSize;
      long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
      int cellsCountDelta = suffixCellsCount - newCellsCount;
      region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
      LOG.debug(
        "Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
          + "size={} 　suffix off heap size={}, new segment off heap size={}, suffix cells "
          + "count={}, new segment cells count={}",
        suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
        suffixCellsCount, newCellsCount);
    }
    return true;
  }

  private static long getSegmentsHeapSize(List<? extends Segment> list) {
    long res = 0;
    for (Segment segment : list) {
      res += segment.getHeapSize();
    }
    return res;
  }

  private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
    long res = 0;
    for (Segment segment : list) {
      res += segment.getOffHeapSize();
    }
    return res;
  }

  private static long getSegmentsKeySize(List<? extends Segment> list) {
    long res = 0;
    for (Segment segment : list) {
      res += segment.getDataSize();
    }
    return res;
  }

  private static int getSegmentsCellsCount(List<? extends Segment> list) {
    int res = 0;
    for (Segment segment : list) {
      res += segment.getCellsCount();
    }
    return res;
  }

  /**
   * If the caller holds the current version, go over the the pipeline and try to flatten each
   * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
   * Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
   * Return after one segment is successfully flatten.
   * @return true iff a segment was successfully flattened
   */
  public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType,
    MemStoreCompactionStrategy.Action action) {

    if (requesterVersion != version) {
      LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
        + requesterVersion + ", actual version: " + version);
      return false;
    }

    synchronized (pipeline) {
      if (requesterVersion != version) {
        LOG.warn("Segment flattening failed, because versions do not match");
        return false;
      }
      int i = -1;
      for (ImmutableSegment s : pipeline) {
        i++;
        if (s.canBeFlattened()) {
          s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
          if (s.isEmpty()) {
            // after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
            // we can skip it.
            continue;
          }
          // size to be updated
          MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
            (CSLMImmutableSegment) s, idxType, newMemstoreAccounting, action);
          replaceAtIndex(i, newS);
          if (region != null) {
            // Update the global memstore size counter upon flattening there is no change in the
            // data size
            MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
            region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
              mss.getCellsCount());
          }
          LOG.debug("Compaction pipeline segment {} flattened", s);
          return true;
        }
      }
    }
    // do not update the global memstore size counter and do not increase the version,
    // because all the cells remain in place
    return false;
  }

  public boolean isEmpty() {
    return readOnlyCopy.isEmpty();
  }

  public List<? extends Segment> getSegments() {
    return readOnlyCopy;
  }

  public long size() {
    return readOnlyCopy.size();
  }

  public long getMinSequenceId() {
    long minSequenceId = Long.MAX_VALUE;
    LinkedList<? extends Segment> localCopy = readOnlyCopy;
    if (!localCopy.isEmpty()) {
      minSequenceId = localCopy.getLast().getMinSequenceId();
    }
    return minSequenceId;
  }

  public MemStoreSize getTailSize() {
    LinkedList<? extends Segment> localCopy = readOnlyCopy;
    return localCopy.isEmpty() ? new MemStoreSize() : localCopy.peekLast().getMemStoreSize();
  }

  public MemStoreSize getPipelineSize() {
    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
    LinkedList<? extends Segment> localCopy = readOnlyCopy;
    for (Segment segment : localCopy) {
      memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
    }
    return memStoreSizing.getMemStoreSize();
  }

  /**
   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
   */
  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
    boolean closeSegmentsInSuffix) {
    matchAndRemoveSuffixFromPipeline(suffix);
    if (segment != null) {
      pipeline.addLast(segment);
    }
    // During index merge we won't be closing the segments undergoing the merge. Segment#close()
    // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
    // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
    // compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
    // created for the result segment. So we can release the chunks associated with the compacted
    // segments.
    if (closeSegmentsInSuffix) {
      for (Segment itemInSuffix : suffix) {
        itemInSuffix.close();
      }
    }
  }

  /**
   * Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
   * {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
   * suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
   * Must be called under the {@link CompactionPipeline#pipeline} Lock.
   */
  private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
    if (suffix.isEmpty()) {
      return;
    }
    if (pipeline.size() < suffix.size()) {
      throw new IllegalStateException(
        "CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
          + "],pipeline size must greater than or equals suffix size");
    }

    ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
    ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
    int count = 0;
    while (suffixIterator.hasPrevious()) {
      Segment suffixSegment = suffixIterator.previous();
      Segment pipelineSegment = pipelineIterator.previous();
      if (suffixSegment != pipelineSegment) {
        throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
          + " is not pipleline segment:[" + pipelineSegment + "]");
      }
      count++;
    }

    for (int index = 1; index <= count; index++) {
      pipeline.pollLast();
    }

  }

  // replacing one segment in the pipeline with a new one exactly at the same index
  // need to be called only within synchronized block
  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
      justification = "replaceAtIndex is invoked under a synchronize block so safe")
  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
    pipeline.set(idx, newSegment);
    readOnlyCopy = new LinkedList<>(pipeline);
    // the version increment is indeed needed, because the swap uses removeAll() method of the
    // linked-list that compares the objects to find what to remove.
    // The flattening changes the segment object completely (creation pattern) and so
    // swap will not proceed correctly after concurrent flattening.
    version++;
  }

  public Segment getTail() {
    List<? extends Segment> localCopy = getSegments();
    if (localCopy.isEmpty()) {
      return null;
    }
    return localCopy.get(localCopy.size() - 1);
  }

  private boolean addFirst(ImmutableSegment segment) {
    pipeline.addFirst(segment);
    return true;
  }

  // debug method
  private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
    if (suffix.isEmpty()) {
      // empty suffix is always valid
      return true;
    }
    Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
    Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
    ImmutableSegment suffixCurrent;
    ImmutableSegment pipelineCurrent;
    for (; suffixBackwardIterator.hasNext();) {
      if (!pipelineBackwardIterator.hasNext()) {
        // a suffix longer than pipeline is invalid
        return false;
      }
      suffixCurrent = suffixBackwardIterator.next();
      pipelineCurrent = pipelineBackwardIterator.next();
      if (suffixCurrent != pipelineCurrent) {
        // non-matching suffix
        return false;
      }
    }
    // suffix matches pipeline suffix
    return true;
  }

}
